-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Job](Fix)Improve Event Publishing with Timeout #45103
Conversation
### Summary: This PR refines the publishEvent method to improve event publishing reliability by introducing a timeout mechanism and enhanced logging. The changes allow for a more responsive system when attempting to publish events to the disruptor, especially in cases where the ring buffer may not have sufficient capacity at the time. #### Timeout Implementation: A 1-second timeout (in nanoseconds) is set, after which the event publishing attempt will stop if the required capacity is not available. The timeout is tracked using System.nanoTime() for precise elapsed time measurement. #### Remaining Capacity Check: The method checks if the remainingCapacity() of the ring buffer is greater than 1 (this can be adjusted based on your capacity requirements). If enough capacity is available, the event is published; otherwise, it waits and retries.
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
run buildall |
TPC-H: Total hot run time: 40121 ms
|
TPC-DS: Total hot run time: 197667 ms
|
ClickBench: Total hot run time: 32.64 s
|
return true; | ||
|
||
// Timeout reached without publishing the event | ||
LOG.warn("Failed to publish event within the specified timeout (1 second)." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In log, it should be current size of the buffer
, not remaining size of the buffer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes...done
run buildall |
TPC-H: Total hot run time: 40031 ms
|
TPC-DS: Total hot run time: 197618 ms
|
ClickBench: Total hot run time: 33.23 s
|
run buildall |
TPC-H: Total hot run time: 39932 ms
|
TPC-DS: Total hot run time: 197458 ms
|
ClickBench: Total hot run time: 32.36 s
|
fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
Outdated
Show resolved
Hide resolved
…sruptor.java Co-authored-by: morrySnow <[email protected]>
run buildall |
TPC-H: Total hot run time: 39984 ms
|
TPC-DS: Total hot run time: 196827 ms
|
ClickBench: Total hot run time: 32.89 s
|
run buildall |
TPC-H: Total hot run time: 39946 ms
|
ClickBench: Total hot run time: 33.32 s
|
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
// Check if there is enough remaining capacity in the ring buffer | ||
// Adjusting to check if the required capacity is available (instead of hardcoding 1) | ||
if (disruptor.getRingBuffer().remainingCapacity() > 1) { | ||
// Publish the event if there is enough capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be concurrency? When making a judgment, it is greater than 1, and when publishing, there is no capacity left
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, only the time wheel is in use, and since the time wheel itself is single-threaded, there are no issues.
disruptor.getRingBuffer().publishEvent(eventTranslator, args); | ||
if (LOG.isDebugEnabled()) { | ||
LOG.debug("publishEvent success,the remaining buffer size is {}", | ||
disruptor.getRingBuffer().remainingCapacity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of 'remainingCapacity'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply to add debug logs.
|
||
public boolean addTask(AbstractTask task) { | ||
try { | ||
executor.execute(() -> runTask(task)); // 直接提交任务 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
english
run buildall |
TPC-H: Total hot run time: 40061 ms
|
TPC-DS: Total hot run time: 191201 ms
|
ClickBench: Total hot run time: 33.22 s
|
PR approved by at least one committer and no changes requested. |
### Summary: This PR refines the publishEvent method to improve event publishing reliability by introducing a timeout mechanism and enhanced logging. The changes allow for a more responsive system when attempting to publish events to the disruptor, especially in cases where the ring buffer may not have sufficient capacity at the time. #### Timeout Implementation: A 1-second timeout (in nanoseconds) is set, after which the event publishing attempt will stop if the required capacity is not available. The timeout is tracked using System.nanoTime() for precise elapsed time measurement. #### Remaining Capacity Check: The method checks if the remainingCapacity() of the ring buffer is greater than 1 (this can be adjusted based on your capacity requirements). If enough capacity is available, the event is published; otherwise, it waits and retries.
### Summary: This PR refines the publishEvent method to improve event publishing reliability by introducing a timeout mechanism and enhanced logging. The changes allow for a more responsive system when attempting to publish events to the disruptor, especially in cases where the ring buffer may not have sufficient capacity at the time. #### Timeout Implementation: A 1-second timeout (in nanoseconds) is set, after which the event publishing attempt will stop if the required capacity is not available. The timeout is tracked using System.nanoTime() for precise elapsed time measurement. #### Remaining Capacity Check: The method checks if the remainingCapacity() of the ring buffer is greater than 1 (this can be adjusted based on your capacity requirements). If enough capacity is available, the event is published; otherwise, it waits and retries.
…45300) Cherry-picked from #45103 Co-authored-by: Calvin Kirs <[email protected]>
…45299) Cherry-picked from #45103 Co-authored-by: Calvin Kirs <[email protected]>
Summary:
This PR refines the publishEvent method to improve event publishing reliability by introducing a timeout mechanism and enhanced logging. The changes allow for a more responsive system when attempting to publish events to the disruptor, especially in cases where the ring buffer may not have sufficient capacity at the time.
Timeout Implementation:
A 1-second timeout (in nanoseconds) is set, after which the event publishing attempt will stop if the required capacity is not available. The timeout is tracked using System.nanoTime() for precise elapsed time measurement.
Remaining Capacity Check:
The method checks if the remainingCapacity() of the ring buffer is greater than 1 (this can be adjusted based on your capacity requirements). If enough capacity is available, the event is published; otherwise, it waits and retries.
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)